package com.atguigu.flink05;

import com.atguigu.beans.WaterSensor;
import com.atguigu.func.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author Felix
 * @date 2024/2/23
 * 该案例演示了JdbcSink
 */
public class Flink02_Sink_JDBC {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(1);
        //TODO 2.从指定的网络端口读取数据
        SingleOutputStreamOperator<WaterSensor> wsDS = env
                .socketTextStream("hadoop102", 8888)
                .map(new WaterSensorMapFunction());
        //TODO 3.将流中数据写到Mysql中
        SinkFunction<WaterSensor> sinkFunction = JdbcSink.<WaterSensor>sink(
                "insert into ws values(?,?,?)",
                //给？好占位符复制
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement ps, WaterSensor ws) throws SQLException {
                        ps.setString(1, ws.getId());
                        ps.setLong(2, ws.getTs());
                        ps.setString(3, ws.getVc() + "");
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(5)
                        .withBatchIntervalMs(5000)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        );
        wsDS.addSink(sinkFunction);
        env.execute();
    }
}
